Crate piper

source ·
Expand description

A bounded single-producer single-consumer pipe.

This crate provides a ring buffer that can be asynchronously read from and written to. It is created via the pipe function, which returns a pair of Reader and Writer handles. They implement the AsyncRead and AsyncWrite traits, respectively.

The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need &mut access to read or write to them. If multiple-producer/multiple-consumer handles are needed, consider wrapping them in an Arc<Mutex<...>> or similar.

When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts to read will result in Ok(0), i.e. they will always ‘successfully’ read 0 bytes.

When the receiver is dropped, the pipe is closed and no more bytes and be written into it. Further writes will result in Ok(0), i.e. they will always ‘successfully’ write 0 bytes.

Version 0.2.0 Notes

Previously, this crate contained other synchronization primitives, such as bounded channels, locks, and event listeners. These have been split out into their own crates:

Examples

Asynchronous Tasks

Communicate between asynchronous tasks, potentially on other threads.

use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::{future, prelude::*};
use std::time::Duration;


// Create a pair of handles.
let (mut reader, mut writer) = piper::pipe(1024);

// Create the executor.
let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();

// Spawn a detached task for random data to the pipe.
let writer = ex.spawn(async move {
    for _ in 0..1_000 {
        // Generate 8 random numnbers.
        let random = fastrand::u64(..).to_le_bytes();

        // Write them to the pipe.
        writer.write_all(&random).await.unwrap();

        // Wait a bit.
        async_io::Timer::after(Duration::from_millis(5)).await;
    }

    // Drop the writer to close the pipe.
    drop(writer);
});

// Detach the task so that it runs in the background.
writer.detach();

// Spawn a task for reading from the pipe.
let reader = ex.spawn(async move {
    let mut buf = vec![];

    // Read all bytes from the pipe.
    reader.read_to_end(&mut buf).await.unwrap();

    println!("Random data: {:#?}", buf);
});

Parallel::new()
    // Run four executor threads.
    .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
    // Run the main future on the current thread.
    .finish(|| future::block_on(async {
        // Wait for the reader to finish.
        reader.await;

        // Signal the executor threads to shut down.
        drop(signal);
    }));

Blocking I/O

File I/O is blocking; therefore, in async code, you must run it on another thread. This example spawns another thread for reading a file and writing it to a pipe.

use futures_lite::{future, prelude::*};
use std::fs::File;
use std::io::prelude::*;
use std::thread;

// Create a pair of handles.
let (mut r, mut w) = piper::pipe(1024);

// Spawn a thread for reading a file.
thread::spawn(move || {
    let mut file = File::open("Cargo.toml").unwrap();

    // Read the file into a buffer.
    let mut buf = [0u8; 16384];
    future::block_on(async move {
        loop {
            // Read a chunk of bytes from the file.
            // Blocking is okay here, since this is a separate thread.
            let n = file.read(&mut buf).unwrap();
            if n == 0 {
                break;
            }

            // Write the chunk to the pipe.
            w.write_all(&buf[..n]).await.unwrap();
        }

        // Close the pipe.
        drop(w);
    });
});

// Read bytes from the pipe.
let mut buf = vec![];
r.read_to_end(&mut buf).await.unwrap();

println!("Read {} bytes", buf.len());

However, the lower-level poll_fill and poll_drain methods take impl Read and impl Write arguments, respectively. This allows you to skip the buffer entirely and read/write directly from the file into the pipe. This approach should be preferred when possible, as it avoids an extra copy.

// In the `future::block_on` call above...
loop {
    let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
    if n == 0 {
        break;
    }
}

The blocking crate is preferred in this use case, since it uses more efficient strategies for thread management and pipes.

Structs

  • The reading side of a pipe.
  • The writing side of a pipe.

Functions

  • Creates a bounded single-producer single-consumer pipe.